package com.demo1.common;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author wangxikai
 * @date 2024/5/29
 * @Description
 */
@Slf4j
@Component
public class ReportReadMqListener implements StreamListener<String, MapRecord<String, String, String>> {

    public static RedisStreamUtil redisStreamUtil;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        // stream的key值
        String streamKey = message.getStream();
        //消息ID
        RecordId recordId = message.getId();
        //消息内容
        Map<String, String> msg = message.getValue();
        //TODO 处理逻辑

        log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg);
        //逻辑处理完成后，ack消息，删除消息，group为消费组名称
        StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey);
        xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
        redisStreamUtil.del(streamKey, recordId.getValue());
    }
}